"""
-*- coding: utf-8 -*-
@Author  : Link
@Time    : 2022/12/20 14:15
@Site    : 
@File    : capability.py
@Software: PyCharm
@Remark  : 
"""
from typing import List

import pandas as pd
import numpy as np

from app_test.test_utils.wrapper_utils import Time
from common.app_variable import PtmdModule, LimitType, DataModule, DatatType, Calculation, FailFlag
from common.data_class_interface.for_analysis_stdf import DTP_HEAD, PRR_HEAD, BIN_HEAD
from parser_core.stdf_parser_func import PtmdOptFlag, DtpTestFlag, PtmdParmFlag
from ui_component.ui_app_variable import UiGlobalVariable


class CapabilityUtils:
    """
    要注意:
        存到数据库中的数据必然不能被Round操作
    """

    @staticmethod
    # @Time()
    def top_fail(top_fail_df: pd.DataFrame, data_df: pd.DataFrame) -> (pd.DataFrame, int, object):
        """
        T×ODO:
            从原始数据集中计算, 直接确认是否fail
            不能找PASS, 要找Fail了多少
            时间开销大
        :param top_fail_df: ["PART_ID", "FAIL_FLAG"]
        :param data_df: Dtp Data, 需要和 top_fail_df 同步
        :return: 60k row, 40 column, 800ms, column吃时间
        """
        all_qty = len(top_fail_df)
        temp_data_df = data_df[data_df.index.isin(top_fail_df.index)]
        fail_df = temp_data_df[temp_data_df.FAIL_FLG == FailFlag.FAIL]
        fail_qty = len(fail_df)
        top_fail_df = top_fail_df[~top_fail_df.index.isin(fail_df.index)]
        if len(top_fail_df) > all_qty:
            raise Exception("error len(top_fail_df) > all_qty")
        return top_fail_df, fail_qty, fail_df.index

    @staticmethod
    @Time()
    def calculation_top_fail(df_module: DataModule):
        """
        Top Fail如何计算? 算逐项fail即可.
        T×ODO:
            1. 去除多个文件中, 重复的数据
            2. 取数据并进行运算
        :param df_module:
        :return:
        """
        df_use_top_fail = df_module.prr_df
        dtp_df = df_module.dtp_df
        top_fail_dict = {}
        for row in df_module.ptmd_df.itertuples():  # type:PtmdModule
            " 逐项计算Top Fail "
            df_use_top_fail, fail_qty, fail_index = CapabilityUtils.top_fail(
                df_use_top_fail,
                dtp_df.loc[row.TEST_ID],
            )
            df_module.prr_df.loc[df_module.prr_df.index.isin(fail_index), PRR_HEAD.FAIL_TEST_ID] = row.TEST_ID
            try:
                top_fail_dict[row.TEST_ID] += fail_qty
            except KeyError:
                top_fail_dict[row.TEST_ID] = fail_qty
        return top_fail_dict

    @staticmethod
    # @Time()
    def re_cal_top_fail(ptmd: PtmdModule, df_module: DataModule, dtp_unit_df: pd.DataFrame, table: dict = None) -> (
            pd.DataFrame, pd.DataFrame):
        """
        重新计算, 使用ptmd中包含的新的limit信息
        :param ptmd:
        :param df_module:
        :param dtp_unit_df:
        :param table: table中的row, Calculation.dict()
        :return: 60k row, 40 column, 800ms -> ??? sometimes faster than top_fail function
        """
        logic_and = []
        if not ptmd.OPT_FLAG & PtmdOptFlag.NoLowLimit:
            if ptmd.PARM_FLG & PtmdParmFlag.EqualLowLimit:  # >=
                logic_and.append((dtp_unit_df.RESULT >= ptmd.LO_LIMIT))
            else:  # >
                logic_and.append((dtp_unit_df.RESULT > ptmd.LO_LIMIT))
        if not ptmd.OPT_FLAG & PtmdOptFlag.NoHighLimit:
            if ptmd.PARM_FLG & PtmdParmFlag.EqualHighLimit:  # <=
                logic_and.append((dtp_unit_df.RESULT <= ptmd.HI_LIMIT))
            else:  # <
                logic_and.append((dtp_unit_df.RESULT < ptmd.HI_LIMIT))
        if len(logic_and) == 0:  # No fail
            return dtp_unit_df
        if len(logic_and) == 1:
            items = logic_and[0]
            fail_df = dtp_unit_df.loc[~items]
        else:
            items = np.logical_and(*logic_and)
            fail_df = dtp_unit_df.loc[~items]
        dtp_unit_df.loc[~items, DTP_HEAD.FAIL_FLG] = FailFlag.FAIL
        fail_loc = df_module.prr_df.index.isin(fail_df.DIE_ID)
        df_module.prr_df.loc[fail_loc, PRR_HEAD.FAIL_FLAG] = FailFlag.FAIL
        if table is None:
            return dtp_unit_df
        df_module.prr_df.loc[fail_loc, PRR_HEAD.HARD_BIN] = table[PRR_HEAD.HARD_BIN]
        df_module.prr_df.loc[fail_loc, PRR_HEAD.SOFT_BIN] = table[PRR_HEAD.SOFT_BIN]
        df_module.prr_df.loc[fail_loc, PRR_HEAD.FAIL_TEST_ID] = table[DTP_HEAD.TEST_ID]
        return dtp_unit_df

    @staticmethod
    @Time()
    def calculation_new_top_fail(df_module: DataModule):
        """
        TODO:
            @20230204->需要修改下逻辑->先计算dtp_df的FAIL_FLG->再直接调用calculation_top_fail即可
            @20230205->后面如果需要更新BIN值, 是否可以在这里面进行操作, ptmd_df 可以扩展?
        重新设置limit值后top fail的计算 -> 精度丢失问题, 即使limit没有变化, 算出来的fail rate和上面的函数可能也不一样
        运行时间肯定会长了一些 -> 实际和上面的操作时间一致? 上面的操作应该会更加简单和速度的.
        Top Fail如何计算? 算逐项fail即可.
        :param df_module:
        :return:
        """
        dtp_df = df_module.dtp_df.copy()
        dtp_df.loc[:, DTP_HEAD.FAIL_FLG] = FailFlag.PASS

        # TODO: 待添加-> 更新BIN和PART_FLG

        df_module.prr_df.loc[:, PRR_HEAD.FAIL_FLAG] = FailFlag.PASS
        dtp_df.reset_index(inplace=True)
        dtp_df_dict = {}
        new_dtp_df_list = []
        for test_id, df in dtp_df.groupby(DTP_HEAD.TEST_ID):
            dtp_df_dict[test_id] = df
        for row in df_module.ptmd_df.itertuples():  # type:PtmdModule
            unit_dtp = CapabilityUtils.re_cal_top_fail(
                row,
                df_module,
                dtp_df_dict[row.TEST_ID],
            )
            new_dtp_df_list.append(unit_dtp)
        new_dtp_df = pd.concat(new_dtp_df_list)
        new_dtp_df.set_index([DTP_HEAD.TEST_ID, PRR_HEAD.DIE_ID], inplace=True)
        df_module.dtp_df = new_dtp_df
        return CapabilityUtils.calculation_top_fail(df_module)

    @staticmethod
    def calculation_ptr(
            ptmd: PtmdModule, top_fail_qty: int, data_df: pd.DataFrame, all_qty: int
    ) -> Calculation:
        """
        T×ODO:
            3倍中位数绝对偏差去极值
            时间开销大
        :param top_fail_qty:
        :param ptmd:
        :param data_df:
        :param all_qty: 计算Top Fail Rate
        :return:
        """

        # def _mad(factor):
        #     """
        #     3倍中位数绝对偏差去极值 by CSDN: https://blog.csdn.net/m0_37967652/article/details/122900866
        #     """
        #     me = np.median(factor)
        #     mad = np.median(abs(factor - me))
        #     # 求出3倍中位数的上下限制
        #     up = me + (3 * 1.4826 * mad)
        #     down = me - (3 * 1.4826 * mad)
        #     # 利用3倍中位数的值去极值
        #     factor = np.where(factor > up, up, factor)
        #     factor = np.where(factor < down, down, factor)
        #     return factor

        # data_df["RESULT"] = _mad(data_df["RESULT"])
        decimal = UiGlobalVariable.GraphPlotFloatRound
        fail_exec = data_df.FAIL_FLG == FailFlag.FAIL
        reject_qty = len(data_df[fail_exec])
        if len(data_df) == reject_qty:
            pass_df = data_df  # T×ODO: 全部失效了
        else:
            pass_df = data_df[~fail_exec]
        data_mean, data_min, data_max, data_std, data_median = \
            pass_df.RESULT.mean(), pass_df.RESULT.min(), pass_df.RESULT.max(), pass_df.RESULT.std(), \
            pass_df.RESULT.median()
        if data_std == 0:
            data_std = 1E-05
        cpk = round(min([(ptmd.HI_LIMIT - data_mean) / (3 * data_std),
                         (data_mean - ptmd.LO_LIMIT) / (3 * data_std)]), decimal)
        l_limit_type = LimitType.ThenLowLimit
        if ptmd.OPT_FLAG & PtmdOptFlag.NoLowLimit:
            l_limit_type = LimitType.NoLowLimit
        elif ptmd.PARM_FLG & PtmdParmFlag.EqualLowLimit:
            l_limit_type = LimitType.EqualLowLimit
        h_limit_type = LimitType.ThenHighLimit
        if ptmd.OPT_FLAG & PtmdOptFlag.NoHighLimit:
            h_limit_type = LimitType.NoHighLimit
        elif ptmd.PARM_FLG & PtmdParmFlag.EqualHighLimit:
            h_limit_type = LimitType.EqualHighLimit
        return Calculation(
            TEST_ID=ptmd.TEST_ID,  # 每个测试项目最后整合后只会有唯一一个TEST_ID
            DATAT_TYPE=ptmd.DATAT_TYPE,
            TEST_NUM=ptmd.TEST_NUM,
            TEST_TXT=ptmd.TEST_TXT,
            UNITS=ptmd.UNITS,
            LO_LIMIT=round(ptmd.LO_LIMIT, decimal),
            HI_LIMIT=round(ptmd.HI_LIMIT, decimal),
            AVG=round(data_mean, decimal),
            STD=round(data_std, decimal),
            CPK=abs(cpk),
            MEDIAN=round(data_median, decimal),
            QTY=len(data_df),
            FAIL_QTY=top_fail_qty,
            # T×ODO: 注意 top fail的Rate一定是要%总颗数,不能%测试颗数, 待更新
            FAIL_RATE="{}%".format(round(top_fail_qty / all_qty * 100, 3)),
            REJECT_QTY=reject_qty,
            REJECT_RATE="{}%".format(round(reject_qty / len(data_df) * 100, 3)),
            MIN=round(data_min, decimal),  # 注意, 是取得PASS区域的数据
            MAX=round(data_max, decimal),  # 注意, 是取得PASS区域的数据
            LO_LIMIT_TYPE=l_limit_type,
            HI_LIMIT_TYPE=h_limit_type,
            ALL_DATA_MIN=round(data_df.RESULT.min(), decimal),
            ALL_DATA_MAX=round(data_df.RESULT.max(), decimal),
            TEXT=ptmd.TEXT,
            SOFT_BIN=-1,
            SOFT_BIN_NAME="NA",
            HARD_BIN=-1,
            HARD_BIN_NAME="NA",
        )

    @staticmethod
    def calculation_ftr(
            ptmd: PtmdModule, top_fail_qty: int, data_df: pd.DataFrame, all_qty: int
    ) -> Calculation:
        """
        T×ODO: FTR 也会被转为PTR的数据模型
        只计算fail rate
        :param top_fail_qty:
        :param ptmd:
        :param data_df:
        :param all_qty:
        :return:
        """
        l_limit_type = LimitType.ThenLowLimit
        if ptmd.OPT_FLAG & PtmdOptFlag.NoLowLimit:
            l_limit_type = LimitType.NoLowLimit
        elif ptmd.PARM_FLG & PtmdParmFlag.EqualLowLimit:
            l_limit_type = LimitType.EqualLowLimit
        h_limit_type = LimitType.ThenHighLimit
        if ptmd.OPT_FLAG & PtmdOptFlag.NoHighLimit:
            h_limit_type = LimitType.NoHighLimit
        elif ptmd.PARM_FLG & PtmdParmFlag.EqualHighLimit:
            h_limit_type = LimitType.EqualHighLimit
        decimal = UiGlobalVariable.GraphPlotFloatRound
        reject_qty = len(data_df[data_df.TEST_FLG & DtpTestFlag.TestFailed == DtpTestFlag.TestFailed])
        return Calculation(
            TEST_ID=ptmd.TEST_ID,  # 每个测试项目最后整合后只会有唯一一个TEST_ID
            DATAT_TYPE=ptmd.DATAT_TYPE,
            TEST_NUM=ptmd.TEST_NUM,
            TEST_TXT=ptmd.TEST_TXT,
            UNITS=ptmd.UNITS,
            LO_LIMIT=round(ptmd.LO_LIMIT, decimal),
            HI_LIMIT=round(ptmd.HI_LIMIT, decimal),
            AVG=np.nan,
            STD=np.nan,
            CPK=np.nan,
            MEDIAN=np.nan,
            QTY=len(data_df),
            FAIL_QTY=top_fail_qty,
            # T×ODO: 注意 top fail的Rate一定是要%总颗数,不能%测试颗数, 待更新
            FAIL_RATE="{}%".format(round(top_fail_qty / all_qty * 100, 3)),
            REJECT_QTY=reject_qty,
            REJECT_RATE="{}%".format(round(reject_qty / len(data_df) * 100, 3)),
            MIN=-0.1,  # 注意, 是取得有效区域的数据
            MAX=1.1,  # 注意, 是取得有效区域的数据
            LO_LIMIT_TYPE=l_limit_type,
            HI_LIMIT_TYPE=h_limit_type,
            ALL_DATA_MIN=-0.1,
            ALL_DATA_MAX=1.1,
            TEXT=ptmd.TEXT,
            SOFT_BIN=-1,
            SOFT_BIN_NAME="NA",
            HARD_BIN=-1,
            HARD_BIN_NAME="NA",
        )

    @staticmethod
    @Time()
    def calculation_capability(df_module: DataModule, top_fail_dict: dict) -> List[dict]:
        """
        python dict 是可以保持顺序的
            用于计算整个数据的Top Fail等信息
        :param df_module:
        :param top_fail_dict:
        :return:
        """
        all_qty = len(df_module.prr_df)
        capability_key_list = []
        for row in df_module.ptmd_df.itertuples():  # type:PtmdModule
            data_df = df_module.dtp_df.loc[row.TEST_ID]  # .loc[:].copy()  # T×ODO: 10%时间开销
            if row.DATAT_TYPE in {DatatType.PTR, DatatType.MPR}:
                cal_data = CapabilityUtils.calculation_ptr(
                    row, top_fail_dict[row.TEST_ID], data_df, all_qty
                )
                capability_key_list.append(cal_data.dict())
                continue
            if row.DATAT_TYPE == DatatType.FTR:
                cal_data = CapabilityUtils.calculation_ftr(
                    row, top_fail_dict[row.TEST_ID], data_df, all_qty
                )
                capability_key_list.append(cal_data.dict())
                continue
        return capability_key_list

    @staticmethod
    @Time()
    def pre_view_limit(ptmd_df_limit: pd.DataFrame) -> List[Calculation]:
        """
        用于粗略检查Limit
        """
        limit_list: List[Calculation] = []
        for ptmd in ptmd_df_limit.itertuples():  # type:PtmdModule
            l_limit_type = LimitType.ThenLowLimit
            if ptmd.OPT_FLAG & PtmdOptFlag.NoLowLimit:
                l_limit_type = LimitType.NoLowLimit
            elif ptmd.PARM_FLG & PtmdParmFlag.EqualLowLimit:
                l_limit_type = LimitType.EqualLowLimit
            h_limit_type = LimitType.ThenHighLimit
            if ptmd.OPT_FLAG & PtmdOptFlag.NoHighLimit:
                h_limit_type = LimitType.NoHighLimit
            elif ptmd.PARM_FLG & PtmdParmFlag.EqualHighLimit:
                h_limit_type = LimitType.EqualHighLimit
            decimal = UiGlobalVariable.GraphPlotFloatRound
            limit_list.append(
                Calculation(
                    TEST_ID=ptmd.TEST_ID,  # 每个测试项目最后整合后只会有唯一一个TEST_ID
                    DATAT_TYPE=ptmd.DATAT_TYPE,
                    TEST_NUM=ptmd.TEST_NUM,
                    TEST_TXT=ptmd.TEST_TXT,
                    UNITS=ptmd.UNITS,
                    LO_LIMIT=round(ptmd.LO_LIMIT, decimal),
                    HI_LIMIT=round(ptmd.HI_LIMIT, decimal),
                    AVG=np.nan,
                    STD=np.nan,
                    CPK=np.nan,
                    MEDIAN=np.nan,
                    QTY=-1,
                    FAIL_QTY=-1,
                    # T×ODO: 注意 top fail的Rate一定是要%总颗数,不能%测试颗数, 待更新
                    FAIL_RATE="NA%",
                    REJECT_QTY=-1,
                    REJECT_RATE="NA%",
                    MIN=-0.1,
                    MAX=1.1,
                    LO_LIMIT_TYPE=l_limit_type,
                    HI_LIMIT_TYPE=h_limit_type,
                    ALL_DATA_MIN=-0.1,
                    ALL_DATA_MAX=1.1,
                    TEXT=ptmd.TEXT,
                    SOFT_BIN=-1,
                    SOFT_BIN_NAME="NA",
                    HARD_BIN=-1,
                    HARD_BIN_NAME="NA",
                )
            )
        return limit_list

    @staticmethod
    @Time()
    def pandas_calculation_capability(df_module: DataModule):
        """
        使用pandas来获取top_fail
        """
        temp_fail_exec = df_module.dtp_df.FAIL_FLG == FailFlag.FAIL
        # 1. 找到都是Fail的数据
        temp_fail = df_module.dtp_df[temp_fail_exec].copy()
        temp_pass = df_module.dtp_df[~temp_fail_exec].copy()
        # 2. 选取第一行的数据
        temp_fail.reset_index(inplace=True)
        reject_count = temp_fail.groupby("NEW_TEST_ID")[DTP_HEAD.ID].count()
        print(reject_count)
        temp_data = temp_fail.groupby("DIE_ID", sort=False).first()
        top_fail_count = temp_data.groupby("NEW_TEST_ID")[DTP_HEAD.ID].count()
        print(top_fail_count)

    @staticmethod
    @Time()
    def pandas_calculation_top_fail(temp_fail: pd.DataFrame) -> (object, object, object):
        # 2. 选取第一行的数据
        temp_fail.reset_index(inplace=True)
        reject_count = temp_fail.groupby("NEW_TEST_ID")[DTP_HEAD.ID].count()
        temp_data = temp_fail.groupby("DIE_ID", sort=False).first()
        top_fail_count = temp_data.groupby("NEW_TEST_ID")[DTP_HEAD.ID].count()
        return reject_count, top_fail_count, temp_data

    @staticmethod
    @Time()
    def new_calculation_capability(df_module: DataModule) -> List[dict]:
        """
        通过ptmd_df_limit计算TopFail和去重并取出唯一测试项目
        将TopFail计算和数据分析整合在一起
        逻辑:
            1. 使用ByGroup的方式来做, 尽量优化的越来越少的GroupBy分组
            2. TopFail使用Fail的数据来计算, 在Group计算中随动赋值
            3. 为了省下时间搞这么复杂也不是我希望的啊~
        """
        all_qty = len(df_module.prr_df)
        pass_qty = len(df_module.prr_df[df_module.prr_df.FAIL_FLAG == FailFlag.PASS])
        if all_qty != pass_qty:
            temp_fail_exec = df_module.dtp_df.FAIL_FLG == FailFlag.FAIL
            temp_fail = df_module.dtp_df[temp_fail_exec].copy()
            temp_pass = df_module.dtp_df[~temp_fail_exec].copy()

            pass_group_result = temp_pass.groupby([DTP_HEAD.NEW_TEST_ID], sort=False)[DTP_HEAD.RESULT]
            all_group_result = df_module.dtp_df.groupby([DTP_HEAD.NEW_TEST_ID], sort=False)[DTP_HEAD.RESULT]
            count = all_group_result.count()
            reject_count, top_fail_count, fail_row = CapabilityUtils.pandas_calculation_top_fail(temp_fail)
            df_module.prr_df.loc[df_module.prr_df.index.isin(fail_row.index), PRR_HEAD.FAIL_TEST_ID] = fail_row[
                DTP_HEAD.NEW_TEST_ID]
            df_module.prr_df[PRR_HEAD.FAIL_TEST_ID] = df_module.prr_df[PRR_HEAD.FAIL_TEST_ID].astype(np.int32)
        else:
            pass_group_result = df_module.dtp_df.groupby([DTP_HEAD.NEW_TEST_ID], sort=False)[DTP_HEAD.RESULT]
            all_group_result = pass_group_result
            count = all_group_result.count()
            reject_count, top_fail_count = None, None

        # TODO: 20230530, 我觉得有必要加入一个从所有的数据中计算均值等
        # 1. 用所有的数据来算STD
        # 2. 再更新Limit
        # 3. 然后再用更新Limit后的PASS数据的STD来算新的Limit
        # 4. 或是写代码, 逐项选取 5% -> 95% 的数据来算均值和标准差?
        pass_data_mean, pass_data_min, pass_data_max, pass_data_std, pass_data_median = \
            pass_group_result.mean(), pass_group_result.min(), pass_group_result.max(), pass_group_result.std(), \
            pass_group_result.median()

        all_data_mean, all_data_min, all_data_max, all_data_std, all_data_median = \
            all_group_result.mean(), all_group_result.min(), all_group_result.max(), all_group_result.std(), \
            all_group_result.median()
        decimal = UiGlobalVariable.GraphPlotFloatRound

        capability_list = []
        for ptmd in df_module.ptmd_df_limit.itertuples():  # type:PtmdModule
            if ptmd.NEW_TEST_ID not in count:
                continue
            qty = count.loc[ptmd.NEW_TEST_ID]
            if qty == 0 or np.isnan(qty):
                print("np.isnan 需要将数据反馈用于Debug!")
                continue
            reject_qty, top_fail_qty = 0, 0
            if reject_count is not None and ptmd.NEW_TEST_ID in reject_count:
                reject_qty = reject_count.loc[ptmd.NEW_TEST_ID]
            if top_fail_count is not None and ptmd.NEW_TEST_ID in top_fail_count:
                top_fail_qty = top_fail_count.loc[ptmd.NEW_TEST_ID]

            l_limit_type = LimitType.ThenLowLimit
            if ptmd.OPT_FLAG & PtmdOptFlag.NoLowLimit:
                l_limit_type = LimitType.NoLowLimit
            elif ptmd.PARM_FLG & PtmdParmFlag.EqualLowLimit:
                l_limit_type = LimitType.EqualLowLimit
            h_limit_type = LimitType.ThenHighLimit
            if ptmd.OPT_FLAG & PtmdOptFlag.NoHighLimit:
                h_limit_type = LimitType.NoHighLimit
            elif ptmd.PARM_FLG & PtmdParmFlag.EqualHighLimit:
                h_limit_type = LimitType.EqualHighLimit

            if ptmd.DATAT_TYPE in {DatatType.PTR, DatatType.MPR}:
                data_mean = all_data_mean.loc[ptmd.NEW_TEST_ID]
                data_min = all_data_min.loc[ptmd.NEW_TEST_ID]
                data_max = all_data_max.loc[ptmd.NEW_TEST_ID]
                data_std = all_data_std.loc[ptmd.NEW_TEST_ID]
                data_median = all_data_median.loc[ptmd.NEW_TEST_ID]
                if not UiGlobalVariable.PandasScreen and ptmd.NEW_TEST_ID in pass_data_mean:
                    data_mean = pass_data_mean.loc[ptmd.NEW_TEST_ID]
                    data_min = pass_data_min.loc[ptmd.NEW_TEST_ID]
                    data_max = pass_data_max.loc[ptmd.NEW_TEST_ID]
                    data_std = pass_data_std.loc[ptmd.NEW_TEST_ID]
                    data_median = pass_data_median.loc[ptmd.NEW_TEST_ID]

                if data_std == 0:
                    data_std = 1E-06
                cpk = round(min([(ptmd.HI_LIMIT - data_mean) / (3 * data_std),
                                 (data_mean - ptmd.LO_LIMIT) / (3 * data_std)]), decimal)
                if cpk < -9998:
                    cpk = -9998
                capability_list.append(
                    Calculation(
                        TEST_ID=ptmd.NEW_TEST_ID,  # 每个测试项目最后整合后只会有唯一一个TEST_ID
                        DATAT_TYPE=ptmd.DATAT_TYPE,
                        TEST_NUM=ptmd.TEST_NUM,
                        TEST_TXT=ptmd.TEST_TXT,
                        UNITS=ptmd.UNITS,
                        LO_LIMIT=round(ptmd.LO_LIMIT, decimal),
                        HI_LIMIT=round(ptmd.HI_LIMIT, decimal),
                        AVG=round(data_mean, decimal),
                        STD=round(data_std, decimal),
                        CPK=cpk,
                        MEDIAN=round(data_median, decimal),
                        QTY=qty,
                        FAIL_QTY=top_fail_qty,
                        # T×ODO: 注意 top fail的Rate一定是要%总颗数,不能%测试颗数, 待更新
                        FAIL_RATE="{}%".format(round(top_fail_qty / all_qty * 100, 3)),
                        REJECT_QTY=reject_qty,
                        REJECT_RATE="{}%".format(round(reject_qty / qty * 100, 3)),
                        MIN=round(data_min, decimal),  # 注意, 是取得PASS区域的数据
                        MAX=round(data_max, decimal),  # 注意, 是取得PASS区域的数据
                        LO_LIMIT_TYPE=l_limit_type,
                        HI_LIMIT_TYPE=h_limit_type,
                        ALL_DATA_MIN=round(all_data_min.loc[ptmd.NEW_TEST_ID], decimal),
                        ALL_DATA_MAX=round(all_data_max.loc[ptmd.NEW_TEST_ID], decimal),
                        TEXT=ptmd.TEXT,
                        SOFT_BIN=99,
                        SOFT_BIN_NAME="NA",
                        HARD_BIN=99,
                        HARD_BIN_NAME="NA",
                    ).dict()
                )

                continue
            if ptmd.DATAT_TYPE == DatatType.FTR:
                capability_list.append(
                    Calculation(
                        TEST_ID=ptmd.NEW_TEST_ID,  # 每个测试项目最后整合后只会有唯一一个TEST_ID
                        DATAT_TYPE=ptmd.DATAT_TYPE,
                        TEST_NUM=ptmd.TEST_NUM,
                        TEST_TXT=ptmd.TEST_TXT,
                        UNITS=ptmd.UNITS,
                        LO_LIMIT=round(ptmd.LO_LIMIT, decimal),
                        HI_LIMIT=round(ptmd.HI_LIMIT, decimal),
                        AVG=1,
                        STD=0.3,
                        CPK=np.nan,
                        MEDIAN=np.nan,
                        QTY=qty,
                        FAIL_QTY=top_fail_qty,
                        # T×ODO: 注意 top fail的Rate一定是要%总颗数,不能%测试颗数, 待更新
                        FAIL_RATE="{}%".format(round(top_fail_qty / all_qty * 100, 3)),
                        REJECT_QTY=reject_qty,
                        REJECT_RATE="{}%".format(round(reject_qty / qty * 100, 3)),
                        MIN=-0.1,  # 注意, 是取得有效区域的数据
                        MAX=1.1,  # 注意, 是取得有效区域的数据
                        LO_LIMIT_TYPE=l_limit_type,
                        HI_LIMIT_TYPE=h_limit_type,
                        ALL_DATA_MIN=-0.1,
                        ALL_DATA_MAX=1.1,
                        TEXT=ptmd.TEXT,
                        SOFT_BIN=99,
                        SOFT_BIN_NAME="NA",
                        HARD_BIN=99,
                        HARD_BIN_NAME="NA",
                    ).dict()
                )

        return capability_list

    @staticmethod
    @Time()
    def new_re_calculation_capability(df_module: DataModule, capability_key_dict: dict):
        """
        capability_key_dict key:row.NEW_TEST_ID value:table中的数据
        只需要更新ptd即可
        1. 先将所有的ptd_df的FAIL_FLG和prr_df的FAIL_FLAG置于1
        2. by NEW_TEST_ID Group后, 对PASS/FAIL状态进行重算
        3. BIN更新的逻辑
            3.1 每个TEST_ID会绑定一个BIN值
        """
        dtp_df = df_module.dtp_df.copy()
        dtp_df.loc[:, DTP_HEAD.FAIL_FLG] = FailFlag.PASS
        # T×ODO: 待添加-> 更新BIN和[PART_FLG 没必要], 20230422 Done
        df_module.prr_df.loc[:, PRR_HEAD.FAIL_FLAG] = FailFlag.PASS
        df_module.prr_df.loc[:, PRR_HEAD.HARD_BIN] = 1
        df_module.prr_df.loc[:, PRR_HEAD.SOFT_BIN] = 1
        df_module.prr_df.loc[:, PRR_HEAD.FAIL_TEST_ID] = -1

        # 新的BinDf
        hbin, sbin = {}, {}
        for each in capability_key_dict.values():
            hbin[each["HARD_BIN"]] = each["HARD_BIN_NAME"]
            sbin[each["SOFT_BIN"]] = each["SOFT_BIN_NAME"]

        bin_df_list = [
            {BIN_HEAD.BIN_TYPE: "SBR", BIN_HEAD.BIN_NUM: 1, BIN_HEAD.BIN_PF: "P", BIN_HEAD.BIN_NAM: "PASS"},
            {BIN_HEAD.BIN_TYPE: "HBR", BIN_HEAD.BIN_NUM: 1, BIN_HEAD.BIN_PF: "P", BIN_HEAD.BIN_NAM: "PASS"},
        ]

        for sb, sb_name in sbin.items():
            bin_df_list.append(
                {BIN_HEAD.BIN_TYPE: "SBR", BIN_HEAD.BIN_NUM: sb, BIN_HEAD.BIN_PF: "F", BIN_HEAD.BIN_NAM: sb_name}
            )
        for hb, hb_name in hbin.items():
            bin_df_list.append(
                {BIN_HEAD.BIN_TYPE: "HBR", BIN_HEAD.BIN_NUM: hb, BIN_HEAD.BIN_PF: "F", BIN_HEAD.BIN_NAM: hb_name}
            )

        df_module.bin_df = pd.DataFrame(bin_df_list)

        dtp_df.reset_index(inplace=True)
        dtp_df_dict = {}
        new_dtp_df_list = []
        for test_id, df in dtp_df.groupby(DTP_HEAD.NEW_TEST_ID):
            dtp_df_dict[test_id] = df
        for row in df_module.ptmd_df_limit.itertuples():  # type:PtmdModule
            unit_dtp = CapabilityUtils.re_cal_top_fail(
                row,
                df_module,
                dtp_df_dict[row.NEW_TEST_ID],
                capability_key_dict[row.NEW_TEST_ID]
            )
            new_dtp_df_list.append(unit_dtp)
        new_dtp_df = pd.concat(new_dtp_df_list)
        new_dtp_df.set_index([DTP_HEAD.NEW_TEST_ID, PRR_HEAD.DIE_ID], inplace=True)
        df_module.dtp_df = new_dtp_df
